package com.dec.kks.etl.producer;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class DECKafkaUtil {

    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String GROUP_ID_CONFIG = "group.id";
    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
    public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
    public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
    public static final String CLIENT_ID_CONFIG = "client.id";
    public static final String ACKS_CONFIG = "acks";
    public static final String RETRIES_CONFIG = "retries";
    public static final String LINGER_MS_CONFIG = "linger.ms";
    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
    public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
    public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
    public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
    public static final String BATCH_SIZE_CONFIG = "batch.size";
    public static final String MAXRATEPERPARTITION_CONFIG = "maxRatePerPartition";
    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
    public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
    /**
     * kafka consumer 初始化配置
     *
     * @return
     */
    public static KafkaConsumer<String, String> initConsumer(Map<Object, Object> props) {
        Properties config = new Properties();
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, props.get(AUTO_OFFSET_RESET_CONFIG));
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, props.get(MAX_POLL_RECORDS_CONFIG));
        config.put(ConsumerConfig.GROUP_ID_CONFIG, props.get(GROUP_ID_CONFIG));
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.get(BOOTSTRAP_SERVERS_CONFIG));
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, props.get(SESSION_TIMEOUT_MS_CONFIG));
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.get(VALUE_DESERIALIZER_CLASS_CONFIG));
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props.get(KEY_DESERIALIZER_CLASS_CONFIG));
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, props.get(AUTO_COMMIT_INTERVAL_MS_CONFIG));
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, props.get(ENABLE_AUTO_COMMIT_CONFIG));
        return new KafkaConsumer<String, String>(config);
    }

    public static KafkaProducer initProducer(Map<String, Object> props) {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.get(BOOTSTRAP_SERVERS_CONFIG));
        config.put(ProducerConfig.ACKS_CONFIG, props.get(ACKS_CONFIG));
        config.put(ProducerConfig.RETRIES_CONFIG, props.get(RETRIES_CONFIG));
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, props.get(BATCH_SIZE_CONFIG));
        config.put(ProducerConfig.LINGER_MS_CONFIG, props.get(LINGER_MS_CONFIG));
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, props.get(BUFFER_MEMORY_CONFIG));
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props.get(KEY_SERIALIZER_CLASS_CONFIG));
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props.get(VALUE_SERIALIZER_CLASS_CONFIG));
        config.put(ProducerConfig.SEND_BUFFER_CONFIG, props.get(SEND_BUFFER_CONFIG));
        config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, props.get(MAX_REQUEST_SIZE_CONFIG));
        config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-" + System.currentTimeMillis());
        return new KafkaProducer<String, String>(config);
    }



    /**
     * 发送消息
     *
     * @param key key
     * @param msg msg
     * @param msg msg
     * @return true:发送成功，false:发送失败
     * @throws Exception Exception
     */
    public static boolean sendMsg(KafkaProducer producer, String topic, String key, String msg)/* throws Exception */ {
        boolean isDone = false;
        if (StringUtils.isNotEmpty(msg)) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, msg);
            Future<RecordMetadata> fu = producer.send(record);
            try {
                RecordMetadata rm = fu.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        return isDone;
    }


}
